package com.vf.cloud.server.signalling.handler;

import java.util.Map;
import com.alibaba.fastjson.JSONObject;
import com.jfinal.kit.StrKit;
import com.vf.cloud.common.util.RequestParser;
import com.vf.cloud.server.adapter.AdapterServer;
import com.vf.cloud.server.signalling.SignallingServer;
import com.vf.cloud.server.signalling.pool.CachePool;
import com.vf.cloud.server.signalling.util.SignallingUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SignallingChannelInboundHandlerHandler extends SimpleChannelInboundHandler<Object> {

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {

		if (2 == SignallingServer.getInstance().getStatus()) {
			log.info(String.format("Signalling is connected:" + ctx.channel().remoteAddress()));
		} else {
			log.info(String.format("Signalling : " + SignallingServer.getInstance().getStatus() + ",Now close Channel:"
					+ ctx.channel().remoteAddress()));
			ctx.close();
		}

	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		SignallingUtil.disconnected(ctx.channel().id().asLongText());
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.channel().flush();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
		try {

			if (2 == SignallingServer.getInstance().getStatus()) {
				if (msg instanceof FullHttpRequest) {
					handleHttpRequest(ctx, (FullHttpRequest) msg);
				} else {
					handlerWebSocketFrame(ctx, msg);
				}
			}

		} finally {
		}
	}

	WebSocketServerHandshaker handshaker;

	/**
	 * 处理 FullHttpRequest
	 * 
	 * @param ctx
	 * @param request
	 */
	protected void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
		if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
			sendHttpResponse(ctx, req,
					new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
			return;
		}

		String uri = req.uri();
		WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
				"ws://" + req.headers().get(HttpHeaderNames.HOST) + uri, null, false);
		handshaker = wsFactory.newHandshaker(req);
		if (handshaker == null) {
			WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
		} else {
			handshaker.handshake(ctx.channel(), req);
		}

		if (uri.contains("?")) {
			log.info("Player comming!");
			try {
				Map<String, String> param = new RequestParser(req).parse();
				if (!param.containsKey("EIO")) {
					SignallingUtil.sendEvent(ctx, 5000, "参数有误.");
					ctx.close();
					return;
				}
				String EIO = param.get("EIO");
				
				if(!CachePool.isEnabled(EIO)) {
					log.info("资源无效，请重新请求.");
					ctx.close();
					return;
				}
				
				CachePool.addPlayer(EIO, ctx);
				SignallingUtil.sendConfig(ctx);
				
				JSONObject json = new JSONObject();
				json.put("type", "playerConnected");
				json.put("playerId", EIO);
				json.put("dataChannel", true);
				json.put("sfu", false);
				SignallingUtil.sendFromPlayerToUE(EIO, json);
				
			} catch (Exception e) {
				SignallingUtil.sendEvent(ctx, 5000, e.getMessage());
				ctx.close();
			}
		} else {
			uri = uri.replaceFirst("\\/", "");
			String[] params = uri.split("\\/", -1);
			if (params == null || params.length < 1) {
				// SignallingUtil.sendEvent(ctx, 5000, "非法客户端.");
				ctx.close();
				return;
			}

			String EIO = params[0];
			CachePool.addUE(EIO, ctx);

			if (!AdapterServer.getInstance().hasGPU(EIO)) {
				log.info("GPU资源重置，关闭渲染服务.");
				ctx.close();
				return;
			}

//			JSONObject json = new JSONObject();
//			json.put("type", "playerConnected");
//			json.put("playerId", EIO);
//			json.put("dataChannel", true);
//			json.put("sfu", false);

			SignallingUtil.sendConfig(ctx);
//			SignallingUtil.sendFromPlayerToUE(EIO, json);
//			SignallingUtil.completeStreaming(playerId);

		}

	}

	public static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
		if (res.status().code() != 200) {
			ByteBuf buf = Unpooled.copiedBuffer(res.status().code() + "", CharsetUtil.UTF_8);
			res.content().writeBytes(buf);
			buf.release();
		}
		// 如果是非Keep-Alive，关闭连接
		if (!HttpUtil.isKeepAlive((HttpMessage) req.protocolVersion()) || res.status().code() != 200) {
			ctx.channel().writeAndFlush(res).addListener(ChannelFutureListener.CLOSE);
		}
	}

	private void handlerWebSocketFrame(ChannelHandlerContext ctx, Object frame) {
		// 判断是否关闭链路的指令
		if (frame instanceof CloseWebSocketFrame) {
			CloseWebSocketFrame closeWebSocketFrame = (CloseWebSocketFrame) frame;
			handshaker.close(ctx.channel(), (CloseWebSocketFrame) closeWebSocketFrame.retain());
			return;
		}
		// 判断是否ping消息
		if (frame instanceof PingWebSocketFrame) {
			return;
		}

		if (frame instanceof TextWebSocketFrame) {

			TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
			JSONObject rawMsg = null;
			try {
				rawMsg = JSONObject.parseObject(textWebSocketFrame.text());
			} catch (Exception e) {
				log.error(String.format("[%s]>> cannot parse Streamer message:%s", 1008, e));
				ctx.close();
			}
			if (rawMsg == null) {
				return;
			}

			String clientType = CachePool.getType(ctx.channel().id().asLongText());
			if (StrKit.equals(clientType, "UE")) {

//				log.info(String.format(" <- Streamer【%s】: %s", ctx.channel().remoteAddress(),
//						textWebSocketFrame.text()));
				String type = rawMsg.getString("type");

				if (StrKit.equals(type, "ping")) {
					SignallingUtil.sendPing(ctx, rawMsg.getLong("time"));
					return;
				}

				String playerId = rawMsg.get("playerId").toString();
				rawMsg.remove(playerId);
				if (StrKit.equals(type, "offer")) {
					SignallingUtil.sendFromUEToPlayer(playerId, rawMsg);
				} else if (StrKit.equals(type, "answer")) {
					SignallingUtil.sendFromUEToPlayer(playerId, rawMsg);
				} else if (StrKit.equals(type, "iceCandidate")) {
					SignallingUtil.sendFromUEToPlayer(playerId, rawMsg);
				} else if (StrKit.equals(type, "disconnectPlayer")) {
					SignallingUtil.disconnectPlayer(playerId);
				} else {
					log.error("unsupported Streamer message type:%s", type);
					ctx.close();
				}
			} else if (StrKit.equals(clientType, "EIO")) {
				String type = rawMsg.getString("type");
				if (StrKit.equals(type, "ping")) {
					SignallingUtil.sendPing(ctx, rawMsg.getLong("time"));
					return;
				}

				String playerId = CachePool.getId(ctx.channel().id().asLongText());
				if (StrKit.isBlank(playerId)) {
					return;
				}

				if (StrKit.equals(type, "offer")) {
					rawMsg.put("playerId", playerId);
					SignallingUtil.sendFromPlayerToUE(playerId, rawMsg);
				} else if (StrKit.equals(type, "answer")) {
					rawMsg.put("playerId", playerId);
					SignallingUtil.sendFromPlayerToUE(playerId, rawMsg);
				} else if (StrKit.equals(type, "iceCandidate")) {
					rawMsg.put("playerId", playerId);
					SignallingUtil.sendFromPlayerToUE(playerId, rawMsg);
				} else if (StrKit.equals(type, "stats")) {
					log.info(String.format("player {%s}: stats\\n${%s}", playerId, rawMsg));
				} else {
					log.error(String.format("player {%s}: unsupported message type: {%s}", playerId, type));
					return;
				}
			}

		}
	}

	/**
	 * 向客户端发送心跳
	 * 
	 * @param ctx
	 * @param evt
	 * @throws Exception
	 */
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		if (evt instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) evt;

			if (event.state().equals(IdleState.READER_IDLE)) {
				SignallingUtil.sendPing(ctx, System.currentTimeMillis());
			} else if (event.state().equals(IdleState.WRITER_IDLE)) {
				SignallingUtil.sendPing(ctx, System.currentTimeMillis());
			} else if (event.state().equals(IdleState.ALL_IDLE)) {
				SignallingUtil.sendPing(ctx, System.currentTimeMillis());
			}
		}
	}

	/**
	 * https://docs.unrealengine.com/4.26/zh-CN/SharingAndReleasing/PixelStreaming/PixelStreamingReference/
	 * 
	 * @param project
	 * @param EIO
	 * @param sid
	 * @param width
	 * @param height
	 * @return
	 * @throws IOException
	 */
//	private int executeCmd(Project project, String EIO, String sid, int width, int height,int graphicsAdapter) throws IOException {
//		try {
//			Runtime runtime = Runtime.getRuntime();
//			
////			String cmd =String.format("cmd /c start /b %s -RenderOffScreen,-AudioMixer,-log,-AllowPixelStreamingCommands,-ForceRes,ResX=%s,ResY=%s,"
////					+ "-PixelStreamingPort=%s,-GraphicsAdapter=%s,-AppId=padus6dypelx1k,-key=62372722,-PixelStreamingIP=%s,-NvEncFrameRateNum=%s,-PixelStreamingHideCursor=true",
////					
////					project.getExeFilePath(),project.getResX(),project.getResY()),Cache.signalling.getSignalingInnerPort(),graphicsAdapter
////			
////			
//			String cmd = String.format(
//					"cmd /c start /b %s -ForceRes -ResX=%s -ResY=%s -PixelStreamingURL=ws://%s:%s/%s/%s -NvEncFrameRateNum=%s -PixelStreamingHideCursor=true  -RenderOFFscreen",
//					project.getExeFilePath(), width, height,
//					Cache.signalling.getSignalingInnerIP(), Cache.signalling.getSignalingInnerPort(), sid, EIO,
//					project.getFrameRate());
//			Process process = runtime.exec(cmd);
//			new Thread(new ConsoleThread(new BufferedReader(new InputStreamReader(process.getInputStream(), "UTF-8"))))
//					.start();
//			return process.waitFor();
//		} catch (InterruptedException e) {
//		}
//		return -1;
//	}
//
//	class ConsoleThread implements Runnable {
//
//		private BufferedReader bfr = null;
//
//		public ConsoleThread(BufferedReader bfr) {
//			this.bfr = bfr;
//		}
//
//		@Override
//		public void run() {
//			String line = null;
//			try {
//				while ((line = bfr.readLine()) != null) {
//					log.debug(line);
//				}
//			} catch (IOException e) {
//				e.printStackTrace();
//			}
//		}
//	}

}
